// Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
//
// Licensed under the BSD 3-Clause License (the "License"); you may not use this
// file except in compliance with the License. You may obtain a copy of the
// License at
//
// https://opensource.org/licenses/BSD-3-Clause
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.

#include "flare/rpc/protocol/protobuf/proto_over_http_protocol.h"

#include <string>
#include <unordered_map>
#include <utility>

#include "google/protobuf/text_format.h"
#include "google/protobuf/util/json_util.h"

#include "flare/base/buffer/zero_copy_stream.h"
#include "flare/base/down_cast.h"
#include "flare/base/enum.h"
#include "flare/base/string.h"
#include "flare/rpc/protocol/protobuf/call_context.h"
#include "flare/rpc/protocol/protobuf/call_context_factory.h"
#include "flare/rpc/protocol/protobuf/detail/dirty_http.h"
#include "flare/rpc/protocol/protobuf/gdt_json_proto_conversion.h"
#include "flare/rpc/protocol/protobuf/message.h"
#include "flare/rpc/protocol/protobuf/rpc_meta.pb.h"
#include "flare/rpc/protocol/protobuf/rpc_options.h"
#include "flare/rpc/protocol/protobuf/service_method_locator.h"

using namespace std::literals;

namespace flare::protobuf {

static const auto kUriPrefix = "/rpc/"s;

// To be pedantic, `Content-Type`s are NOT case sensitive. For the moment we
// don't take that into consideration. In real world I'm not aware of any
// clients using non-lowercased type names. Beside, handling case insensitivity
// is costly.
//
// @sa: https://tools.ietf.org/html/rfc7231#section-3.1.1.1
//
// > The type, subtype, and parameter name tokens are case-insensitive.
// > Parameter values might or might not be case-sensitive, depending on the
// > semantics of the parameter name.

// We take special path for this `Content-Type`. `charset` is recognized.
constexpr auto kContentTypeApplicationJson = "application/json";  // GDT style.

// For `Content-Type`s below, `Content-Type` specified in HTTP header must be an
// exact match.
constexpr auto kContentTypeProto3Json =
    "application/x-proto3-json";  // `text/`? Protocol Buffers 3 style.
constexpr auto kContentTypeProtobuf = "application/x-protobuf";
constexpr auto kContentTypeProtobufText = "text/x-protobuf";

// JSON format defined by `common/encoding/proto_json_format.h`.
FLARE_RPC_REGISTER_CLIENT_SIDE_STREAM_PROTOCOL_ARG(
    "http+gdt-json", ProtoOverHttpProtocol,
    ProtoOverHttpProtocol::ContentType::kApplicationJson, false);
FLARE_RPC_REGISTER_SERVER_SIDE_STREAM_PROTOCOL_ARG(
    "http+gdt-json", ProtoOverHttpProtocol,
    ProtoOverHttpProtocol::ContentType::kApplicationJson, true);

// JSON format defined by `thirdparty/protobuf/util/json_util.h`.
FLARE_RPC_REGISTER_CLIENT_SIDE_STREAM_PROTOCOL_ARG(
    "http+proto3-json", ProtoOverHttpProtocol,
    ProtoOverHttpProtocol::ContentType::kProto3Json, false);
FLARE_RPC_REGISTER_SERVER_SIDE_STREAM_PROTOCOL_ARG(
    "http+proto3-json", ProtoOverHttpProtocol,
    ProtoOverHttpProtocol::ContentType::kProto3Json, true);

// Text generated by `protobuf::Message::ShortDebugString()`.
FLARE_RPC_REGISTER_CLIENT_SIDE_STREAM_PROTOCOL_ARG(
    "http+pb-text", ProtoOverHttpProtocol,
    ProtoOverHttpProtocol::ContentType::kDebugString, false);
FLARE_RPC_REGISTER_SERVER_SIDE_STREAM_PROTOCOL_ARG(
    "http+pb-text", ProtoOverHttpProtocol,
    ProtoOverHttpProtocol::ContentType::kDebugString, true);

// Binary generated by `protobuf::Message::SerializeToString()`.
FLARE_RPC_REGISTER_CLIENT_SIDE_STREAM_PROTOCOL_ARG(
    "http+pb", ProtoOverHttpProtocol,
    ProtoOverHttpProtocol::ContentType::kProtobuf, false);
FLARE_RPC_REGISTER_SERVER_SIDE_STREAM_PROTOCOL_ARG(
    "http+pb", ProtoOverHttpProtocol,
    ProtoOverHttpProtocol::ContentType::kProtobuf, true);

namespace {

// This message is generated by `TryCutMessage`, and will soon be fully parsed
// by `TryParse`.
class OnWireMessage : public Message {
 public:
  OnWireMessage(PooledPtr<rpc::RpcMeta> meta, const NoncontiguousBuffer& body)
      : meta_(std::move(meta)), body_(std::move(body)) {
    SetRuntimeTypeTo<OnWireMessage>();
  }

  std::uint64_t GetCorrelationId() const noexcept override {
    return meta_->correlation_id();
  }

  Type GetType() const noexcept override {
    return FromWireType(meta_->method_type(), meta_->flags());
  }

  PooledPtr<rpc::RpcMeta>* GetMeta() { return &meta_; }
  const NoncontiguousBuffer& GetBody() const { return body_; }

 private:
  PooledPtr<rpc::RpcMeta> meta_;
  NoncontiguousBuffer body_;
};

// Copied from `common/rpc/rpc_error_code.cc`, with minimal changes.
const char* StatusToString(int status) {
  switch (static_cast<rpc::Status>(status)) {
    case rpc::STATUS_SUCCESS:
      return "success";
    case rpc::STATUS_CHANNEL_SHUTDOWN:
      return "ChannelShutdown";
    case rpc::STATUS_FAIL_TO_CONNECT:
      return "FailToConnect";
    case rpc::STATUS_TIMEOUT:
      return "Timeout";
    case rpc::STATUS_SERIALIZE_REQUEST:
      return "SerializeRequestError";
    case rpc::STATUS_PARSE_REQUEST:
      return "ParseRequestError";
    case rpc::STATUS_SERIALIZE_RESPONSE:
      return "SerializeResponseError";
    case rpc::STATUS_PARSE_RESPONSE:
      return "ParseResponseError";
    case rpc::STATUS_INVALID_METHOD_NAME:
      return "InvalidMethodName";
    case rpc::STATUS_SERVICE_NOT_FOUND:
      return "ServiceNotFound";
    case rpc::STATUS_METHOD_NOT_FOUND:
      return "MethodNotFound";
    case rpc::STATUS_OVERLOADED:
      return "ServerOverload";
    case rpc::STATUS_INVALID_TRANSFER_MODE:
      return "InvalidTransferMode";
    case rpc::STATUS_FROM_USER_FEEDBACK:
      return "ErrorFromUserFeedback";
    case rpc::STATUS_OUT_OF_SERVICE:
      return "OutOfService";
    case rpc::STATUS_GET_ROUTE:
      return "ErrorGetRoute";
    case rpc::STATUS_NO_PEER:  // STATUS_GET_ROUTE_EMPTY
      return "GetRouteEmpty";
    case rpc::STATUS_GET_ROUTE_ALL_DISABLED:
      return "GetRouteAllDisabled";
    case rpc::STATUS_FAILED:
      return "Unknown";
    case rpc::STATUS_FROM_USER:
      return "UserError";
    default:
      return "<Unknown>";
  }
}

std::string EncodeStreamTrailer(std::uint32_t status, const std::string& desc) {
  // `gdt::RpcErrorCodeString` expects `gdt::RpcStatus` instead of
  // `rpc::Status`. Since we carefully chosen `rpc::Status` to make them
  // numerical equal to `gdt::RpcStatus`, this should work.
  auto rc = kRpcHttpHeaderErrorCode + ": "s + StatusToString(status);
  if (status != rpc::STATUS_SUCCESS) {
    // Code in `gdt::` cannot handle "Rpc-Error-Reason" if the call succeeded,
    // so we only attach a reason if the call fail.
    if (!desc.empty()) {
      rc += "\r\n"s + kRpcHttpHeaderErrorReason + ": " + desc;
    }
  }
  return rc + "\r\n";
}

void AppendChunk(const NoncontiguousBuffer& body,
                 NoncontiguousBufferBuilder* builder) {
  builder->Append(Format("{:x}", body.ByteSize()));
  builder->Append("\r\n"sv);
  builder->Append(body);
  builder->Append("\r\n"sv);
}

void UpdateRpcMetaFromTrailer(std::string_view trailer, rpc::RpcMeta* meta) {
  static const auto kPrefixSize =
      std::string_view(kRpcHttpHeaderErrorCode).size() + 2;
  auto&& resp = meta->mutable_response_meta();
  if (trailer.size() < kPrefixSize) {
    resp->set_status(rpc::STATUS_FAILED);
    resp->set_description(fmt::format("Invalid HTTP trailer [{}].", trailer));
    return;
  }
  trailer = trailer.substr(kPrefixSize);  // Remove prefix.
  if (trailer != StatusToString(rpc::STATUS_SUCCESS)) {
    auto parts = Split(trailer, ":", true);
    resp->set_status(rpc::STATUS_FROM_USER);
    resp->set_description(
        std::string(parts.size() > 1 ? Trim(parts[1]) : trailer));
  } else {
    resp->set_status(rpc::STATUS_SUCCESS);
  }
}

// Per spec, `charset` is allowed for `application/json`. This method tests if
// the given type string represents `application/json` with UTF-8 `charset`.
// (For the moment we don't support UTF-16 or UTF-32, they're not widely used.)
bool IsContentTypeApplicationJsonSlow(std::string_view type) {
  constexpr auto kApplicationJsonSemicolon = "application/json;"sv;
  if (!StartsWith(type, kApplicationJsonSemicolon)) {
    return false;
  }
  auto rest = ToLower(Trim(type.substr(kApplicationJsonSemicolon.size())));
  constexpr auto kCharset = "charset="sv;
  if (!StartsWith(rest, kCharset)) {
    return false;
  }
  auto charset = rest.substr(kCharset.size());
  return charset == "utf-8";
}

// gdt::HttpDecodeChunked does not interoperate with `NoncontiguousBuffer` well,
// so we roll our own.
//
// Returns -1 on error, 0 if more data is needed, positive on success.
int ReadNextChunk(NoncontiguousBuffer* nb, NoncontiguousBuffer* chunk,
                  std::string* trailer) {
  auto line = FlattenSlowUntil(*nb, "\r\n");
  if (!EndsWith(line, "\r\n")) {
    return 0;
  }
  const size_t kCrLfSize = 2;
  auto pos = line.find_first_of(';');
  if (pos == std::string::npos) {
    pos = line.find_first_of('\r');
  }
  auto size_opt = TryParse<int>(std::string_view(line).substr(0, pos), 16);
  if (!size_opt) {
    FLARE_LOG_WARNING_EVERY_SECOND("Parse chunked failed: {}", line);
    return -1;
  }
  auto size = *size_opt;

  if (!size) {  // Trailer follows then.
    // Given that the tailer shouldn't be long, this shouldn't hurt much.
    auto data = FlattenSlow(*nb);
    data.erase(0, line.size());
    if (data.find("\r\n") == 0) {  // No trailer.
      nb->Skip(line.size() + kCrLfSize);
      return 1;
    }
    if (auto pos = data.find("\r\n\r\n"); pos != std::string::npos) {
      size = pos + 4;  // \r\n\r\n
    }
    if (!size) {
      return 0;
    }
    nb->Skip(line.size());
    *trailer = FlattenSlow(*nb, size - 2 * kCrLfSize);
    nb->Skip(size);
    return 1;
  }

  if (line.size() + size + kCrLfSize > nb->ByteSize()) return 0;
  nb->Skip(line.size());  // This must be done below the above test, otherwise
                          // we'll leave `nb` in a non-consistent state.
  *chunk = nb->Cut(size);
  if (FlattenSlow(*nb, kCrLfSize) != "\r\n") {
    FLARE_LOG_WARNING_EVERY_SECOND(
        "Invalid chunked body: {}",
        line + FlattenSlow(*chunk) + FlattenSlow(*nb, kCrLfSize));
    return -1;
  }
  nb->Skip(kCrLfSize);
  return 1;
}

}  // namespace

static StreamProtocol::Characteristics characteristics_gdt_json = {
    .name = "Protocol Buffers (JSON)",
    .no_connection_reuse_in_streaming_rpcs = true};

static StreamProtocol::Characteristics characteristics_proto3_json = {
    .name = "Protocol Buffers (JSON, Proto3)",
    .no_connection_reuse_in_streaming_rpcs = true};

static StreamProtocol::Characteristics characteristics_debug_str = {
    .name = "Protocol Buffers (Debug String)",
    .no_connection_reuse_in_streaming_rpcs = true};

static StreamProtocol::Characteristics characteristics_proto = {
    .name = "Protocol Buffers", .no_connection_reuse_in_streaming_rpcs = true};

ProtoOverHttpProtocol::ProtoOverHttpProtocol(ContentType content_type,
                                             bool server_side)
    : content_type_(content_type), server_side_(server_side) {
  static const std::unordered_map<
      ContentType, std::pair<std::string, const Characteristics*>>
      kExpectedContentTypes = {
          {ContentType::kApplicationJson,
           {kContentTypeApplicationJson, &characteristics_gdt_json}},
          {ContentType::kProto3Json,
           {kContentTypeProto3Json, &characteristics_proto3_json}},
          {ContentType::kDebugString,
           {kContentTypeProtobufText, &characteristics_debug_str}},
          {ContentType::kProtobuf,
           {kContentTypeProtobuf, &characteristics_proto}}};
  std::tie(expecting_content_type_, characteristics_) =
      kExpectedContentTypes.at(content_type_);
}

const StreamProtocol::Characteristics&
ProtoOverHttpProtocol::GetCharacteristics() const {
  return *characteristics_;
}

const MessageFactory* ProtoOverHttpProtocol::GetMessageFactory() const {
  return &error_message_factory;
}

const ControllerFactory* ProtoOverHttpProtocol::GetControllerFactory() const {
  return &passive_call_context_factory;
}

StreamProtocol::MessageCutStatus ProtoOverHttpProtocol::TryCutMessage(
    NoncontiguousBuffer* buffer, std::unique_ptr<Message>* message) {
  if (current_stream_) {  // A stream is being parsed.
    return TryKeepParsingStream(buffer, message);
  }

  if (server_side_) {  // No need to do protocol detection on client side.
    static constexpr auto kSignature = "POST /rpc/"sv;
    if (buffer->ByteSize() < kSignature.size()) {
      return MessageCutStatus::NotIdentified;
    }
    if (!StartsWith(FlattenSlow(*buffer, kSignature.size()), kSignature)) {
      return MessageCutStatus::ProtocolMismatch;
    }
  }

  // Let's pretend it's pb-over-http then.
  //
  // We don't have to be precise there, so long as we don't collide with other
  // protocols in detecting bytes on wire, it's fine.
  auto header = FlattenSlowUntil(*buffer, "\r\n\r\n");
  if (!EndsWith(header, "\r\n\r\n")) {
    return MessageCutStatus::NotIdentified;
  }

  // We only compare `Content-Type` if we're a server side protocol instance.
  // For client side, if the user wants us to parse the message, then the
  // message *should* be parseable by us anyway.
  //
  // This also helps us to parse messages responded by server that does not fill
  // `Content-Type` (presumably due to their QoI).
  if (server_side_) {
    auto content_type = TryGetHeaderRoughly(header, kContentType);
    if (FLARE_LIKELY(content_type == expecting_content_type_)) {
      // NOTHING.
    } else if (expecting_content_type_ == kContentTypeApplicationJson) {
      // `kContentTypeApplicationJson` is special in that it uses standard
      // content type `application/json`.
      //
      // Per spec, `charset` is allowed to be specified for this content type.
      // Therefore, in case the content-type isn't a exact-match, we need to do
      // extra check for possible `charset` at the tail.

      // This is likely to be an exact match if `charset` is specified by the
      // client. If this matches, we don't have to do heavy-lifting string parse
      // of `Content-Type`.
      constexpr auto kApplicationJsonCharsetUtf8 =
          "application/json; charset=utf-8"sv;
      if (FLARE_LIKELY(content_type == kApplicationJsonCharsetUtf8)) {
        // NOTHING.
      } else if (FLARE_LIKELY(IsContentTypeApplicationJsonSlow(content_type))) {
        // NOTHING.
      } else {
        return MessageCutStatus::ProtocolMismatch;
      }
    } else {
      return MessageCutStatus::ProtocolMismatch;
    }
  }

  // Translate the headers into `RpcMeta` first.
  auto meta = object_pool::Get<rpc::RpcMeta>();
  meta->set_correlation_id(
      TryGetHeaderRoughly<std::uint64_t>(header, kRpcHttpHeaderSeqNo)
          .value_or(0));  // If not provided, the caller shouldn't be
                          // multiplexing on this connection. In this case this
                          // ID is not significant.

  // We do not recognize header `Accept` here. Originally it's recognized and
  // respected when packing response, let's see if this behavior need to be
  // kept.

  if (auto rc = TryExtractRpcMeta(header, meta.Get());
      FLARE_UNLIKELY(rc != MetaParseStatus::Success)) {
    if (server_side_) {
      // Ideally if `rc` indicates invalid HTTP method (method other than POST)
      // we should return HTTP 405 to the user. This can be done by passing this
      // information via `EarlyErrorMessage`.
      *message = std::make_unique<EarlyErrorMessage>(
          meta->correlation_id(), rpc::STATUS_FAILED, "Invalid request.");
      return MessageCutStatus::Cut;
    } else {
      return MessageCutStatus::Error;
    }
  }

  auto body_size = TryGetHeaderRoughly<std::size_t>(header, kContentLength);
  if (FLARE_UNLIKELY(!body_size)) {
    // If it's transferred using `chunked` encoding, let's decode it as a
    // stream.
    if (TryGetHeaderRoughly(header, kTransferEncoding) == "chunked"sv) {
      return TryCutMessageFromChunkedEncoding(header, std::move(meta), buffer,
                                              message);
    }
    FLARE_LOG_WARNING_EVERY_SECOND("Missing essential HTTP header [{}].",
                                   "Content-Length");
    return MessageCutStatus::ProtocolMismatch;
  }

  if (buffer->ByteSize() < header.size() + *body_size) {
    return MessageCutStatus::NeedMore;
  }

  // Let's cut the message off then.
  buffer->Skip(header.size());
  auto body = buffer->Cut(*body_size);

  if (server_side_) {
    auto desc = ServiceMethodLocator::Instance()->TryGetMethodDesc(
        protocol_ids::standard, meta->request_meta().method_name());
    if (!desc) {
      *message = std::make_unique<EarlyErrorMessage>(
          meta->correlation_id(), rpc::STATUS_METHOD_NOT_FOUND,
          fmt::format("Method [{}] is not recognized.",
                      meta->request_meta().method_name()));
      return MessageCutStatus::Cut;
    }
    if (IsServerStreamingMethod(desc->method_desc)) {
      meta->set_method_type(rpc::METHOD_TYPE_STREAM);
      meta->set_flags(rpc::MESSAGE_FLAGS_START_OF_STREAM |
                      rpc::MESSAGE_FLAGS_END_OF_STREAM);
    } else {
      meta->set_method_type(rpc::METHOD_TYPE_SINGLE);
    }
  } else {
    meta->set_method_type(rpc::METHOD_TYPE_SINGLE);
  }

  *message = std::make_unique<OnWireMessage>(std::move(meta), std::move(body));
  return MessageCutStatus::Cut;
}

bool ProtoOverHttpProtocol::TryParse(std::unique_ptr<Message>* message,
                                     Controller* controller) {
  if (dyn_cast<EarlyErrorMessage>(message->get())) {
    return true;
  }
  auto ppm = cast<OnWireMessage>(message->get());
  auto&& meta = **ppm->GetMeta();
  auto&& body = ppm->GetBody();
  bool empty_body =
      // Message body (if any) is ignored if this is an error message.
      (!server_side_ && meta.response_meta().status() != rpc::STATUS_SUCCESS) ||
      // Last message in a stream can be empty, and serves solely as an EOS
      // marker.
      (body.Empty() && !!(meta.flags() & rpc::MESSAGE_FLAGS_END_OF_STREAM));

  if (empty_body) {
    *message = std::make_unique<ProtoMessage>(std::move(*ppm->GetMeta()),
                                              std::monostate{});
    return true;
  }

  auto unpack_to = TryGetUnpackingBuffer(
      meta, server_side_ ? nullptr : cast<ProactiveCallContext>(controller));
  if (FLARE_UNLIKELY(!unpack_to)) {
    if (server_side_) {
      // We always need a valid request message for server side, even if it's an
      // empty request.
      *message = std::make_unique<EarlyErrorMessage>(
          meta.correlation_id(), rpc::STATUS_METHOD_NOT_FOUND,
          "Method not found.");
      return true;
    } else {
      FLARE_CHECK(
          cast<ProactiveCallContext>(controller)->accept_response_in_bytes);
      // FIXME: Slow.
      *message = std::make_unique<ProtoMessage>(std::move(*ppm->GetMeta()),
                                                std::move(body));
      return true;
    }
  }

  if (!TryDeserialize(body, unpack_to.Get())) {
    return false;
  }
  *message = std::make_unique<ProtoMessage>(std::move(*ppm->GetMeta()),
                                            std::move(unpack_to));
  return true;
}

void ProtoOverHttpProtocol::WriteMessage(const Message& message,
                                         NoncontiguousBuffer* buffer,
                                         Controller* controller) {
  FLARE_LOG_ERROR_IF_ONCE(!controller->GetTracingContext().empty() ||
                              controller->IsTraceForciblySampled(),
                          "Passing tracing context is not supported by "
                          "Protocol-Buffers-over-HTTP protocol.");

  if (auto msg = dyn_cast<ProtoMessage>(&message)) {
    FLARE_LOG_ERROR_IF_ONCE(
        !msg->attachment.Empty(),
        "Attachment is not supported by HTTP protocol. Dropped silently.");
    if (server_side_) {
      if (msg->GetType() == Message::Type::Single) {
        return WriteResponse(*msg, buffer, controller);
      } else if (!!(msg->GetType() & Message::Type::Stream)) {
        bool start = !!(msg->GetType() & Message::Type::StartOfStream);
        bool end = !!(msg->GetType() & Message::Type::EndOfStream);

        if (start && end) {
          return WriteStreamSingle(*msg, buffer, controller);
        } else if (start) {
          return WriteStreamStart(*msg, buffer, controller);
        } else if (end) {
          return WriteStreamEnd(*msg, buffer, controller);
        } else {
          return WriteStreamContinue(*msg, buffer, controller);
        }
      }
      FLARE_CHECK(0, "Unexpected message type {}.",
                  underlying_value(msg->GetType()));
    } else {
      return WriteRequest(*msg, buffer, controller);
    }
  } else if (auto msg = dyn_cast<EarlyErrorMessage>(&message)) {
    return WriteError(*msg, buffer, controller);
  }
  FLARE_CHECK(0, "Unexpected message type [{}].", GetTypeName(message));
}

ProtoOverHttpProtocol::MessageCutStatus
ProtoOverHttpProtocol::TryCutMessageFromChunkedEncoding(
    const std::string& header, PooledPtr<rpc::RpcMeta> meta,
    NoncontiguousBuffer* buffer, std::unique_ptr<Message>* message) {
  if (server_side_) {  // Multiple-request-single-response was not supported
                       // by HTTP-based protocols.
    FLARE_LOG_WARNING_EVERY_SECOND(
        "Unexpected: `chunked` encoding is received on server side.");
    return MessageCutStatus::Error;
  }

  meta->set_method_type(rpc::METHOD_TYPE_STREAM);
  meta->set_flags(rpc::MESSAGE_FLAGS_START_OF_STREAM);

  // Unless `ReadNextChunk` would succeed, we don't want to touch `buffer`.
  auto buffer_copy = *buffer;
  buffer_copy.Skip(header.size());

  NoncontiguousBuffer body;
  std::string trailer;
  auto result = ReadNextChunk(&buffer_copy, &body, &trailer);
  if (result < 0) {
    return MessageCutStatus::Error;
  } else if (result == 0) {
    return MessageCutStatus::NeedMore;
  }

  // It's the first message in a stream, mark it.
  //
  // Note that we don't do this until we can be sure a message can be cut off.
  // Otherwise on next try to `TryCutMessage`, we won't have a chance to retry
  // `ReadNextChunk` above.
  current_stream_ = *meta;

  buffer->Skip(buffer->ByteSize() - buffer_copy.ByteSize());

  // In case the trailer is present in the first message, let's finish the
  // stream right away.
  if (!trailer.empty()) {
    // In case the trailer is attached to the first message, the streaming is
    // terminated right now (and it has failed.).
    meta->set_method_type(rpc::METHOD_TYPE_STREAM);
    meta->set_flags(rpc::MESSAGE_FLAGS_START_OF_STREAM |
                    rpc::MESSAGE_FLAGS_END_OF_STREAM);
    UpdateRpcMetaFromTrailer(trailer, meta.Get());

    // This does not make sense, as the connection is being closed anyway.
    // (For streaming RPC on HTTP, the connection is close once the call
    // completed, either successfully or erroneously.)
    current_stream_ = std::nullopt;

    *message =
        std::make_unique<OnWireMessage>(std::move(meta), NoncontiguousBuffer());
    return MessageCutStatus::Cut;
  }

  *message = std::make_unique<OnWireMessage>(std::move(meta), std::move(body));
  return MessageCutStatus::Cut;
}

ProtoOverHttpProtocol::MetaParseStatus ProtoOverHttpProtocol::TryExtractRpcMeta(
    const std::string& header, rpc::RpcMeta* meta) const {
  FLARE_CHECK(EndsWith(header, "\r\n\r\n"));
  std::string_view start_line =
      std::string_view(header).substr(0, header.find("\r\n"));
  if (server_side_) {
    FLARE_CHECK(StartsWith(start_line, "POST "));  // Done in `TryCutMessage`.
    auto uri = start_line.substr("POST "sv.size());
    uri = uri.substr(0, uri.find(" "));

    auto&& req_meta = *meta->mutable_request_meta();
    FLARE_CHECK(StartsWith(uri, kUriPrefix));
    req_meta.set_method_name(std::string(uri.substr(kUriPrefix.size())));
    if (auto timeout_opt = TryGetHeaderRoughly<std::uint64_t>(
            header, kRpcHttpHeaderRpcTimeout)) {
      req_meta.set_timeout(*timeout_opt);
    }
  } else {
    if (!StartsWith(start_line, "HTTP/1.1 ")) {
      return MetaParseStatus::kInvalidRequest;
    }
    auto status =
        flare::TryParse<int>(start_line.substr("HTTP/1.1 "sv.size(), 3));
    if (!status) {
      return MetaParseStatus::kInvalidRequest;
    }
    auto&& resp_meta = *meta->mutable_response_meta();
    if (*status != 200 /* OK */) {
      if (auto status_opt =
              TryGetHeaderRoughly<int>(header, kRpcHttpHeaderErrorCode)) {
        // Even if `*status_opt` is not a valid enumerator (test-able by
        // `rpc::Status_IsValid`), we keep it for compatibility between
        // different version of protocols.
        auto rc = static_cast<rpc::Status>(*status_opt);
        if (rc == rpc::STATUS_SUCCESS) {
          FLARE_LOG_WARNING_EVERY_SECOND(
              "Found successful RPC status in non-successful HTTP response.");
          return MetaParseStatus::kCriticalFieldMissing;
        }
        resp_meta.set_status(rc);
      } else {
        return MetaParseStatus::kCriticalFieldMissing;
      }
    } else {
      resp_meta.set_status(rpc::STATUS_SUCCESS);
    }
    if (auto s = TryGetHeaderRoughly(header, kRpcHttpHeaderErrorReason);
        !s.empty()) {
      resp_meta.set_description(std::string(s));
    }
  }
  return MetaParseStatus::Success;
}

MaybeOwning<google::protobuf::Message>
ProtoOverHttpProtocol::TryGetUnpackingBuffer(const rpc::RpcMeta& meta,
                                             ProactiveCallContext* ctx) {
  if (server_side_) {
    auto desc = ServiceMethodLocator::Instance()->TryGetMethodDesc(
        protocol_ids::standard, meta.request_meta().method_name());
    if (!desc) {
      FLARE_LOG_WARNING_EVERY_SECOND("Unrecognized method: [{}].",
                                     meta.request_meta().method_name());
      return nullptr;
    }
    return MaybeOwning(owning, desc->request_prototype->New());
  } else {
    if (ctx->accept_response_in_bytes) {
      return nullptr;
    } else {
      return ctx->GetOrCreateResponse();
    }
  }
  FLARE_CHECK(0);
}

bool ProtoOverHttpProtocol::TryDeserialize(
    const NoncontiguousBuffer& serialized,
    google::protobuf::Message* to) const {
  if (content_type_ == ContentType::kApplicationJson) {
    std::string error;
    auto rc = JsonToProtoMessage(FlattenSlow(serialized), to, &error);
    if (!rc) {
      FLARE_LOG_WARNING_EVERY_SECOND("Failed to parse message: {}", error);
    }
    return rc;
  } else if (content_type_ == ContentType::kProto3Json) {
    auto rc = google::protobuf::util::JsonStringToMessage(
        FlattenSlow(serialized), to);
    if (!rc.ok()) {
      FLARE_LOG_WARNING_EVERY_SECOND("Failed to parse message: {}",
                                     rc.ToString());
    }
    return rc.ok();
  } else if (content_type_ == ContentType::kDebugString) {
    auto rc = google::protobuf::TextFormat::ParseFromString(
        FlattenSlow(serialized), to);
    if (!rc) {
      FLARE_LOG_WARNING_EVERY_SECOND("Failed to parse message.");
    }
    return rc;
  } else if (content_type_ == ContentType::kProtobuf) {
    auto copy = serialized;
    NoncontiguousBufferInputStream nbis(&copy);
    auto rc = to->ParseFromZeroCopyStream(&nbis);
    if (!rc) {
      FLARE_LOG_WARNING_EVERY_SECOND("Failed to parse message.");
    }
    return rc;
  }
  FLARE_CHECK(0);
}

void ProtoOverHttpProtocol::WriteRequest(const ProtoMessage& message,
                                         NoncontiguousBuffer* buffer,
                                         Controller* controller) {
  auto&& meta = *message.meta;
  FLARE_CHECK(meta.has_request_meta());
  auto&& req_meta = meta.request_meta();

  auto body = SerializeMessage(message);

  // Let's build an HTTP request by hand.
  NoncontiguousBufferBuilder builder;
  builder.Append("POST /rpc/"sv);
  builder.Append(req_meta.method_name());
  builder.Append(" HTTP/1.1\r\n");
  builder.Append(
      Format("{}: {}\r\n", kRpcHttpHeaderSeqNo, meta.correlation_id()));
  if (req_meta.has_timeout()) {
    FLARE_CHECK(req_meta.timeout());
    builder.Append(
        Format("{}: {}\r\n", kRpcHttpHeaderRpcTimeout, req_meta.timeout()));
  }
  builder.Append(Format("Content-Type: {}\r\n", expecting_content_type_));
  builder.Append(Format("Content-Length: {}\r\n\r\n", body.ByteSize()));
  builder.Append(std::move(body));

  *buffer = builder.DestructiveGet();
}

void ProtoOverHttpProtocol::WriteResponse(const ProtoMessage& message,
                                          NoncontiguousBuffer* buffer,
                                          Controller* controller) {
  static const std::unordered_map<int, std::pair<int, std::string>>
      kHttpStatusMapping = {
          {rpc::STATUS_SUCCESS, {200, "OK"}},
          {rpc::STATUS_METHOD_NOT_FOUND, {404, "Not Found"}},
          {rpc::STATUS_SERVICE_NOT_FOUND, {404, "Not Found"}},
      };

  auto&& meta = *message.meta;
  FLARE_CHECK(meta.has_response_meta());
  auto&& resp_meta = meta.response_meta();

  NoncontiguousBufferBuilder builder;

  // For non-successful responses, failure should be reflected on HTTP status
  // code.
  if (auto iter = kHttpStatusMapping.find(resp_meta.status());
      iter != kHttpStatusMapping.end()) {
    builder.Append(
        Format("HTTP/1.1 {} {}\r\n", iter->second.first, iter->second.second));
  } else {
    // All unknown status codes are translated to HTTP 500.
    builder.Append("HTTP/1.1 500 Internal Server Error\r\n"sv);
  }
  builder.Append(Format("Content-Type: {}\r\n", expecting_content_type_));
  builder.Append(
      Format("{}: {}\r\n", kRpcHttpHeaderSeqNo, meta.correlation_id()));

  if (resp_meta.status() == rpc::STATUS_SUCCESS) {
    auto body = SerializeMessage(message);
    builder.Append(Format("Content-Length: {}\r\n\r\n", body.ByteSize()));
    // `kRpcHttpHeaderErrorReason` is only required in failed responses.
    builder.Append(std::move(body));
  } else {
    builder.Append(
        Format("{}: {}\r\n", kRpcHttpHeaderErrorCode, resp_meta.status()));
    if (resp_meta.has_description()) {
      builder.Append(Format("{}: {}\r\n", kRpcHttpHeaderErrorReason,
                            resp_meta.description()));
    }
    builder.Append("Content-Length: 0\r\n\r\n"sv);
  }

  *buffer = builder.DestructiveGet();
}

void ProtoOverHttpProtocol::WriteStreamSingle(const ProtoMessage& message,
                                              NoncontiguousBuffer* buffer,
                                              Controller* controller) {
  auto&& meta = *message.meta;
  FLARE_CHECK(meta.has_response_meta());
  auto&& resp_meta = meta.response_meta();

  NoncontiguousBufferBuilder builder;

  // Different from `RepackResponse`, for streaming RPCs, even if the RPC itself
  // failed, HTTP status code is still 200. This convention is a bit ... weird
  // (@sa: `common/rpc/rpc_stream_test.cc`.).
  builder.Append("HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n"sv);
  builder.Append(
      Format("{}: {}\r\n", kRpcHttpHeaderSeqNo, meta.correlation_id()));
  builder.Append(Format("Trailer: {}\r\n", kRpcHttpHeaderErrorCode));
  builder.Append(Format("Content-Type: {}\r\n", expecting_content_type_));
  builder.Append("\r\n"sv);
  if (auto body = SerializeMessage(message); !body.Empty()) {
    AppendChunk(body, &builder);
  }
  builder.Append(
      Format("0\r\n{}\r\n",
             EncodeStreamTrailer(resp_meta.status(), resp_meta.description())));

  *buffer = builder.DestructiveGet();
}

void ProtoOverHttpProtocol::WriteStreamStart(const ProtoMessage& message,
                                             NoncontiguousBuffer* buffer,
                                             Controller* controller) {
  auto&& meta = *message.meta;
  FLARE_CHECK(meta.has_response_meta());
  auto&& resp_meta = meta.response_meta();

  NoncontiguousBufferBuilder builder;
  builder.Append("HTTP/1.1 200 OK\r\n");
  builder.Append(Format("Content-Type: {}\r\n", expecting_content_type_));
  if (resp_meta.status() == rpc::STATUS_SUCCESS) {
    builder.Append(
        Format("{}: {}\r\n", kRpcHttpHeaderSeqNo, meta.correlation_id()));
    builder.Append("Transfer-Encoding: chunked\r\n"sv);
    builder.Append(Format("Trailer: {}\r\n\r\n", kRpcHttpHeaderErrorCode));
    // What if the message is serialized to empty buffer? I'd suspect the remote
    // side would be able to parse the response correctly.
    AppendChunk(SerializeMessage(message), &builder);
  } else {
    builder.Append(
        Format("{}: {}\r\n", kRpcHttpHeaderSeqNo, meta.correlation_id()));
    builder.Append(
        Format("{}: {}\r\n", kRpcHttpHeaderErrorCode, resp_meta.status()));
    builder.Append("Content-Length: 0\r\n\r\n"sv);
  }

  *buffer = builder.DestructiveGet();
}

void ProtoOverHttpProtocol::WriteStreamContinue(const ProtoMessage& message,
                                                NoncontiguousBuffer* buffer,
                                                Controller* controller) {
  NoncontiguousBufferBuilder builder;
  AppendChunk(SerializeMessage(message), &builder);
  *buffer = builder.DestructiveGet();
}

void ProtoOverHttpProtocol::WriteStreamEnd(const ProtoMessage& message,
                                           NoncontiguousBuffer* buffer,
                                           Controller* controller) {
  auto&& meta = message.meta->response_meta();
  NoncontiguousBufferBuilder builder;
  builder.Append(Format(
      "0\r\n{}\r\n", EncodeStreamTrailer(meta.status(), meta.description())));
  *buffer = builder.DestructiveGet();
}

void ProtoOverHttpProtocol::WriteError(const EarlyErrorMessage& message,
                                       NoncontiguousBuffer* buffer,
                                       Controller* controller) {
  auto body = message.GetDescription();
  NoncontiguousBufferBuilder builder;
  if (message.GetStatus() == rpc::STATUS_METHOD_NOT_FOUND) {
    builder.Append("HTTP/1.1 404 Not Found\r\n");
  } else {
    builder.Append("HTTP/1.1 400 Bad Request\r\n");
  }
  builder.Append(Format("Content-Type: {}\r\n", expecting_content_type_));
  builder.Append(Format("Content-Length: {}\r\n\r\n", body.size()));
  builder.Append(body);

  *buffer = builder.DestructiveGet();
}

NoncontiguousBuffer ProtoOverHttpProtocol::SerializeMessage(
    const ProtoMessage& msg) const {
  if (content_type_ == ContentType::kProtobuf) {
    // Most-frequently case, optimized.
    NoncontiguousBufferBuilder nbb;
    WriteTo(msg.msg_or_buffer, &nbb);
    return nbb.DestructiveGet();
  }
  if (msg.msg_or_buffer.index() == 0) {
    return {};
  } else if (msg.msg_or_buffer.index() == 1) {
    auto&& pb = *std::get<1>(msg.msg_or_buffer);
    std::string rc;
    if (content_type_ == ContentType::kApplicationJson) {
      FLARE_CHECK(ProtoMessageToJson(pb, &rc, nullptr));
    } else if (content_type_ == ContentType::kProto3Json) {
      FLARE_CHECK(google::protobuf::util::MessageToJsonString(pb, &rc).ok());
    } else if (content_type_ == ContentType::kDebugString) {
      rc = pb.ShortDebugString();
    } else {
      FLARE_UNREACHABLE();
    }
    return CreateBufferSlow(rc);
  } else if (msg.msg_or_buffer.index() == 2) {
    // Given that the user is not aware of `Accept-Type` from the remote side,
    // this likely leads to deserialization failure at remote side.
    return std::get<2>(msg.msg_or_buffer);
  } else {
    FLARE_UNREACHABLE();
  }
}

ProtoOverHttpProtocol::MessageCutStatus
ProtoOverHttpProtocol::TryKeepParsingStream(NoncontiguousBuffer* buffer,
                                            std::unique_ptr<Message>* msg) {
  FLARE_CHECK(!!current_stream_);  // There must be.
  FLARE_CHECK(!server_side_);      // Only response stream was supported by
                                   // HTTP-based protocols.
  NoncontiguousBuffer body;
  std::string trailer;
  auto result = ReadNextChunk(buffer, &body, &trailer);
  if (result == 0) {
    return MessageCutStatus::NeedMore;
  } else if (result < 0) {
    return MessageCutStatus::Error;
  }

  auto meta = object_pool::Get<rpc::RpcMeta>();

  *meta = *current_stream_;
  meta->set_method_type(rpc::METHOD_TYPE_STREAM);
  if (!trailer.empty()) {
    // There's no body to deserialize for the last message.
    FLARE_CHECK(body.Empty());

    meta->set_flags(rpc::MESSAGE_FLAGS_END_OF_STREAM);
    UpdateRpcMetaFromTrailer(trailer, meta.Get());
    current_stream_ = std::nullopt;  // This is the last chunk. Stream ended.
  }
  *msg = std::make_unique<OnWireMessage>(std::move(meta), std::move(body));
  return MessageCutStatus::Cut;
}

}  // namespace flare::protobuf
